package fun.easycode.datastream;

import cn.hutool.core.collection.CollectionUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import fun.easycode.datastream.repository.Task;
import fun.easycode.datastream.repository.TaskRepository;
import fun.easycode.jointblock.util.ExceptionUtil;
import fun.easycode.jointblock.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 全量任务启动消费者，用于从消息中间件中获取任务启动消息，然后启动任务
 *
 * @author xuzhen97
 */
@Slf4j
public class DataCompleteStartConsumer {
    private final DefaultMQPushConsumer consumer;
    private final DataCompleteExecutor dataCompleteExecutor;
    private final TaskRepository taskRepository;
    private final DataStreamProperties properties;

    /**
     * 构造函数
     *
     * @param dataCompleteExecutor 数据处理器
     * @param taskRepository       任务仓库
     * @param properties           canal配置
     * @throws MQClientException 创建异常
     */
    public DataCompleteStartConsumer(DataCompleteExecutor dataCompleteExecutor, TaskRepository taskRepository, DataStreamProperties properties) throws MQClientException {
        this.dataCompleteExecutor = dataCompleteExecutor;
        this.taskRepository = taskRepository;
        this.properties = properties;
        consumer = createConsumer();
    }

    /**
     * 启动消费者
     *
     * @throws MQClientException 启动异常
     */
    public void start() throws MQClientException {
        consumer.start();
    }

    private DefaultMQPushConsumer createConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DataCompleteStartConsumer");

        consumer.setNamesrvAddr(properties.getRocketMq().getNameSrvAddr());
        consumer.subscribe(properties.getRocketMq().getDefaultCompleteTopic(), "*");

        consumer.registerMessageListener((MessageListenerOrderly) (msgList, consumeOrderlyContext) -> {
            if (CollectionUtil.isEmpty(msgList)) {
                return ConsumeOrderlyStatus.SUCCESS;
            }
            List<DataCompleteStartEntry> data = msgList.stream()
                    .map(msg -> JacksonUtil.readValue(new String(msg.getBody()), new TypeReference<DataCompleteStartEntry>() {
                    }))
                    .collect(Collectors.toList());
            // 启动任务
            data.forEach(entry -> startTask(entry.getTaskId()));

            return ConsumeOrderlyStatus.SUCCESS;
        });
        return consumer;
    }

    private void startTask(String taskId) {
        Task task = taskRepository.getById(taskId);
        if (taskRepository.startTask(task.getId())) {
            log.info("任务启动执行中, taskId: {}", taskId);
            IDataProcessor<?> processor = DataContext.getProcessor(task.getProcessorName());

            DataCompleteProducer<?> producer = new DataCompleteProducer<>(processor, dataCompleteExecutor, task.getParam(), (totalSize, errorMsgList) -> {
                if (errorMsgList.size() > 0) {
                    String errorMsg = String.join(",", errorMsgList);
                    taskRepository.failTask(task.getId(), errorMsg);
                    log.warn("task id {} fail, errorMsgList: {}", task.getId(), errorMsg);
                } else {
                    taskRepository.finishTask(task.getId(), totalSize);
                    log.info("task id {} finish, totalSize: {}", task.getId(), totalSize);
                }
            });
            producer.run();
            log.info("任务执行成功, taskId: {}", taskId);
        }
    }
}
